package org.example.utils

import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo}
import kafka.cluster.{Broker, BrokerEndPoint}
import kafka.common.TopicAndPartition
import kafka.consumer.SimpleConsumer
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.SecurityProtocol
import org.apache.spark.streaming.kafka010.OffsetRange
import org.example.common.Logging
import org.example.constant.ApolloConst

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

/**
 * zK工具类
 * @param buildProducer
 */
class ZkManager(buildProducer:() => ZkClient) extends Logging with Serializable {

  lazy val zkClient: ZkClient = buildProducer()
  lazy val zkUtils:ZkUtils = ZkUtils.apply(zkClient,isZkSecurityEnabled = false)

  def getBeginOffset(topics:Seq[String],groupId:String): mutable.HashMap[TopicPartition,Long]  ={
    val fromOffsets = mutable.HashMap.empty[TopicPartition,Long]
    val partitionMap: mutable.Map[String, Seq[Int]] = zkUtils.getPartitionsForTopics(topics)
    partitionMap.foreach{ topicPartitions: (String, Seq[Int]) =>
      val topic: String = topicPartitions._1
      val partitions: Seq[Int] = topicPartitions._2
      val topicDirs: ZKGroupTopicDirs = new ZKGroupTopicDirs(groupId,topic)
      partitions.foreach{ partition: Int =>
        val tp = new TopicPartition(topic,partition)
        //当前Kafka中数据最早偏移量
        val kafkaOffset_left: Long = getOffsetForKafka(tp)
        val kafkaOffset_right: Long = getOffsetForKafka(tp,OffsetRequest.LatestTime)
        val zkPath = s"${topicDirs.consumerOffsetDir}/$partition"
        zkUtils.makeSurePersistentPathExists(zkPath)
        zkUtils.readDataMaybeNull(zkPath)._1 match {
        //Option(zkUtils.readData(zkPath)._1) match {
//            第一次会获取到Some(null)
          case Some(null) => fromOffsets += tp->kafkaOffset_left
          case Some(zkOffset) =>
            //zk中保存的偏移量与 Kafka中的偏移量进行比较，防止任务长期未启动后再启动造成out off Kafka offset ranges
            if(zkOffset.toLong < kafkaOffset_left) {   //头部越界
              fromOffsets += tp->kafkaOffset_left
            } else if( zkOffset.toLong > kafkaOffset_right){ //尾部越界
              fromOffsets += tp->kafkaOffset_right
            }else fromOffsets += tp->zkOffset.toLong //正常情况
          case None =>
            fromOffsets += tp->kafkaOffset_right
        }
        println("fromOffsets="+fromOffsets)
      }
    }
    fromOffsets
  }

  def getBeginOffsetByHand(topics:Seq[String],groupId:String): mutable.HashMap[TopicPartition,Long]  ={
    val fromOffsets = mutable.HashMap.empty[TopicPartition,Long]
    val partitionMap = zkUtils.getPartitionsForTopics(topics)
    partitionMap.foreach{ topicPartitions =>
      val topic = topicPartitions._1
      fromOffsets += new TopicPartition(topic,0) -> 4259851

    }
    fromOffsets
  }

  def getOffsetForKafka(topicPartition:TopicPartition,time: Long = OffsetRequest.EarliestTime): Long ={
    //获取Kafka主节点
    val brokerId: Int = zkUtils.getLeaderForPartition(topicPartition.topic,topicPartition.partition).get
    val broker: Broker = zkUtils.getBrokerInfo(brokerId).get
    val endpoint: BrokerEndPoint = broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)
    val consumer = new SimpleConsumer(endpoint.host,endpoint.port,10000,100000,"getMinOffset")
    val tp = TopicAndPartition(topicPartition.topic,topicPartition.partition)
    val request= OffsetRequest(Map(tp -> PartitionOffsetRequestInfo(time, 1)))
    consumer.getOffsetsBefore(request).partitionErrorAndOffsets(tp).offsets.head
  }

  def saveEndOffset(offsetRanges:ArrayBuffer[OffsetRange],groupId:String): Unit = {
    offsetRanges.foreach{ offsetRange: OffsetRange =>
      val topicDirs = new ZKGroupTopicDirs(groupId,offsetRange.topic)
      val zkPath = s"${topicDirs.consumerOffsetDir}/${offsetRange.partition}"
      zkUtils.updatePersistentPath(zkPath,offsetRange.untilOffset.toString)
      println(offsetRange.topic+":"+offsetRange.partition+":"+offsetRange.untilOffset.toString)
    }
  }

}

object ZkManager extends Logging {

  def apply(zkServer:String): ZkManager = {
    val createProducerCallback: () => ZkClient = () => {
      val zkClient: ZkClient = ZkUtils.createZkClient(zkServer,30000,30000)
      sys.addShutdownHook{
        info("Execute hook thread: " + this)
        zkClient.close()
      }
      zkClient
    }
    new ZkManager(createProducerCallback)
  }

  def main(args: Array[String]): Unit = {
    //以下为手动设置topic消费组的偏移量
    val offsetRanges = new ArrayBuffer[OffsetRange]()
    offsetRanges.append(OffsetRange.create("alarm_details_topic",0,0,1))

    val zkManager = ZkManager(ApolloConst.zkKafka)
    zkManager.saveEndOffset(offsetRanges,"WarnAttachment")

  }
}
